/*
 * Cainiao.com Inc.
 * Copyright (c) 2013-2021 All Rights Reserved.
 */

package com.gitee.suveng.batch;

import java.net.URL;

import cn.hutool.core.io.resource.ResourceUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.StaticLog;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * 批处理 计算单词
 *
 * @author suwenguang
 * @since 2021-01-21 19:27
 */
public class WordCount {

	public static void main(String[] args) {
		// batch 批处理 有限数据集
		// 创建执行环境
		ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

		// 从文件读取数据
		String filePath = "data/data.txt";
		URL resource = ResourceUtil.getResource(filePath);
		DataSource<String> readTextFile = executionEnvironment.readTextFile(resource.getPath());

		// 按空格分词展开, 转换成(word,1)进行统计
		FlatMapOperator<String, Tuple2<String, Integer>> stringTuple2FlatMapOperator = readTextFile.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
			@Override
			public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
				// flatMap 函数式接口, value 输入, out 输出
				StaticLog.info(StrUtil.format("value:{}", value));
				String[] wordArr = value.split(" ");

				// 分词
				for (String word : wordArr) {
					Tuple2<String, Integer> tuple2 = new Tuple2<>(word, 1);
					out.collect(tuple2);
				}
			}
		});
		DataSet<Tuple2<String, Integer>> sum = stringTuple2FlatMapOperator.groupBy(0).sum(1);

		try {
			sum.print();
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

}
